/**   
 * @Title: WorkerManagerImpl.java 
 * @Package net.gdface.service.search 
 * @Description: TODO 
 * @author guyadong   
 * @date 2015年5月5日 下午5:54:26 
 * @version V1.0   
 */
package net.gdface.worker;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import net.gdface.utils.Assert;
import net.gdface.utils.Judge;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 工作线程管理器<br>
 * 实现固定线程数的线程管理:<br>
 * 在线程池被{@link #shutdown()}之前，始终维持固定数量({@link #getCorePoolSize()})的任务线程<br>
 * 实现任务对象 {@link #workerClass}的复用:<br>
 * 除了{@link #startManager()}开始时创建任务对象，线程生命全过程，不再创建新的任务对象。<br>
 * 在线程池中结束的任务对象会被监视线程({@link #monitorThread})立即重新提交执行<p>
 * 参见{@link ThreadPoolExecutor}
 * @author guyadong
 *
 */
public class WorkerManagerImpl<T extends WorkData> extends ThreadPoolExecutor implements WorkerManager {
	private static final Logger logger = LoggerFactory.getLogger(WorkerManagerImpl.class);
	/**
	 * 保存所有正在运行的{@link Runnable}对象对应的数据{@link Future}<br>
	 * 任务被{@link #submit(Runnable)}提交时，会自动将{@link Future}加入MAP,以保存对应{@link Runnable}<br>
	 * 同一个{@link Runnable}允许被提交多次生成不同的{@link Future}，被不同的线程同时执行,<br>
	 * 所以需要以{@link Future}为key的{@link ConcurrentMap}来保存{@link Runnable},{@link Runnable}可重复<br>
	 * {@link #monitorThread}线程会监视所有运行的{@link Future}<br>
	 * {@link Future}一旦结束，就被{@link #removeAndResubmit(Future, boolean)}从{@link #tasks}中清除掉,<br>
	 * 其对应的{@link Runnable}对象会被重新提交,生成新的{@link Future}加入 {@link #tasks}
	 */
	private final ConcurrentMap<Future<?>,Runnable> tasks;
	/**
	 * 完成任务队列<br>
	 * {@link #afterExecute(Runnable, Throwable)}钩子方法会在任务结束时将{@link Runnable}加入队列<br>
	 * {@link #monitorThread}线程会从队列中取出{@link Runnable}处理<br>
	 */
	private final BlockingQueue<Runnable> finishedWorkers;
	/**
	 * 任务对象的类({@link Class})对象，实现{@link Runnable}接口
	 */
	private final Class<? extends Runnable> workerClass;
	/**
	 * 队列管理器
	 */
	private final QueueManager<T> queueManager;
	/**
	 * 监视线程的检查间隔时间(毫秒)
	 */
	private final long checkIntervalMills;
	/**
	 * 全局结束标志
	 */
	private final AtomicBoolean isFinished=new AtomicBoolean(false);
	/**
	 * 工作管理器的名字，在实例化对象时，可以指定该名字
	 */
	private String name="WM";
	/**
	 * 等待计数器,默认为{@link Long#MAX_VALUE}
	 */
	private long waitCount = Long.MAX_VALUE;
	/**
	 * 监视线程
	 */
	private Thread monitorThread;
	protected WorkerManagerImpl(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, Class<? extends Runnable> workerClass, QueueManager<T> queueManager,
			long checkInterval, String name) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
		Assert.notNull(workerClass, "workerClass");
		Assert.notNull(queueManager, "queueManager");
		this.tasks=new  ConcurrentHashMap<Future<?>,Runnable>(512,0.75f);
		this.finishedWorkers = new LinkedBlockingQueue<Runnable>();
		this.workerClass = workerClass;
		this.queueManager = queueManager;
		this.checkIntervalMills = unit.convert(checkInterval, TimeUnit.MILLISECONDS);
		if(!Judge.isEmpty(name))
			this.name=name;
		logger.info("Thread Pool [{}] size(线程池大小)=[{}]", this.name,corePoolSize);

	}
	
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		// 将结束的任务对象加入结束任务队列，激活监视线程
		this.finishedWorkers.add(r);
	}

	@Override
	public Thread startManager() {
		//不允许重复启动
		if(null!=this.monitorThread&&this.monitorThread.isAlive())
			throw new RuntimeException("startManager can't be recall");
		try {
			// 创建指定指定数目的任务对象,并提交线程池执行
			for (int i = 0; i < this.getCorePoolSize(); i++) {
				Runnable r = workerClass.newInstance();
				this.tasks.put(submit(r),r);
			}
			return startMonitorThread();
		} catch (InstantiationException e) {
			logger.error(e.getMessage(), e);
			throw new RuntimeException(e);
		} catch (IllegalAccessException e) {
			logger.error(e.getMessage(), e);
			throw new RuntimeException(e);
		}
	}

	/**
	 * 从 {@link #tasks}中移除已经完成的任务对象{@link Future}<br>
	 * 当{@code resubmit}为{@code true}时会将结束的任务对象{@link Runnable}重新提交运行
	 * @param task 任务对象
	 * @param resubmit 是否将结束的任务对象重新提交运行
	 * @throws IllegalStateException  {@code task}不是已经完成或取消的任务
	 * @see #resubmit(Runnable)
	 */
	private void removeAndResubmit(Future<?> task, boolean resubmit) throws IllegalStateException {
		if (task.isCancelled() || task.isDone()) {
			try {
				task.get();// 通过调用get()方法，确保任务对象task在被重新提交前上次，上次任务执行结束
			} catch (ExecutionException e) {
				logger.error(e.getCause().getMessage(), e.getCause());
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			} finally {
				// 将任务从map中移除
				Runnable r=this.tasks.remove(task);
				if (resubmit) {
					tasks.put(submit(r), r);
				}
			}
		}else
			throw new IllegalStateException("未结束的任务");
	}
	
	private Thread startMonitorThread() {
		this.monitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					logger.info(String.format("[%s]线程启动", Thread.currentThread().getName()));
					Future<?> f;
					int lastCount = Integer.MAX_VALUE;
					long wait = Long.MAX_VALUE;
					long lastPrintTimeMillis=0L;
					while (true) {
						try {
							// 从结束任务队列中取出一个任务对象
							boolean close = false;
							if (null != (f = (Future<?>) finishedWorkers.poll(checkIntervalMills, TimeUnit.MILLISECONDS))) {
								removeAndResubmit(f, !(close = (isFinished.get() && 0 == queueManager.queueSize())));
							}
							int c = Integer.MAX_VALUE;
							// 定时打出状态信息
							if (null == f || (System.currentTimeMillis() - lastPrintTimeMillis) >= checkIntervalMills) {
								lastPrintTimeMillis = System.currentTimeMillis();
								c = getActiveCount();
								System.out.printf("[%s]%dT%dW%dQ%dR%dF", Thread.currentThread().getName(), c,
										tasks.size(), queueManager.queueSize(), WorkData.WOKING_DATA.size(),Runtime.getRuntime().freeMemory()>>20);
							}
							// 队列中没有数据了，而且isFinished结束标志为true时，开始监视线程的结束过程
							if (close) {
								if (getQueue().isEmpty()) {
									if (Integer.MAX_VALUE == lastCount) {
										// 初始化关闭计数器，仅一次
										shutdown();// 关闭线程池,不再接受新任务
										lastCount = c;
										logger.info(String.format("[%s]线程池关闭就绪", name));
									}
									if (0 == c)// 没有执行任务的线程了,结束监视线程
										break;
									else if (c < lastCount) {
										// 如果活动线程数一直在减少，就重置wait计数器，耐心等待
										lastCount = c;
										wait = waitCount;
										logger.debug(String.format("[%s]线程关闭计数器复位[%d]", name, wait));

									} else {
										// 如果活动线程没有变少，wait计数器减1，减到0就结束线程
										if (Long.MAX_VALUE == wait)
											wait = waitCount;
										if (--wait > 0) {
											logger.debug(String.format("[%s]线程关闭倒计数:%d/%d,活动线程[%s]", name, wait,
													waitCount, c));
											// logger.warn(String.format("%d thread in pool cant finished byself", c));
										} else {
											logger.debug(String.format("[%s]线程强行退出,线程池中还有活动线程数%d", name, c));
											break;
										}
									}
								}
							}
						} catch (InterruptedException e) {
							throw e;
						} catch (RuntimeException e) {
							logger.error(e.getMessage(), e);
						} finally {
						}
					}					
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				} catch(Throwable e){
					logger.error(e.getMessage(),e);
					System.exit(1);
				} finally {
					shutdownNow();
					logger.info(String.format("[%s]线程结束", Thread.currentThread().getName()));
				}
			}

		}, this.name);
		this.monitorThread.start();
		return this.monitorThread;
	}

	/* （非 Javadoc）
	 * @see net.gdface.worker.WorkerManager#stopManager(long, java.util.concurrent.TimeUnit)
	 */
	@Override
	public void stopManager(long waitTime, TimeUnit unit) throws InterruptedException {
		logger.info(String.format("设置结束标志，等待[%s]监视线程结束",this.monitorThread.getName()));
		this.isFinished.set(true);
		this.waitCount = (unit.convert(waitTime, TimeUnit.MILLISECONDS) +checkIntervalMills-1)/ checkIntervalMills;
		this.monitorThread.join();
	}
}
